Intention is to take a list of mutations and make a file for netsurfp
for each of the mutations
split the file into files of a reasonable length Do the same for all the ENSTs in the list
call netsurfp with each of the files - saving them to somewhere sensible change each of the responses into a csv file.
Concatenate the csv files into two - one for the mutations and one for the wild ENSTs
for each of the mutations take out the right line from the mutations and from the wild ENSTs
In [1]:
import pandas as pd
import json
import re
import sys
import subprocess
In [2]:
import os
In [11]:
ENST_codes = get_ENST_codes()
ENST_Uniprot = get_ENST_Uniprot()
In [3]:
def get_ENST_codes():
with open(os.path.abspath('./data/ENST_codes.json'), 'r') as file:
return json.load(file)
In [5]:
def get_ENST_Uniprot():
return pd.DataFrame.from_csv(os.path.abspath('./data/ENST_Uniprot.csv'))
In [ ]:
In [6]:
def clean_directories():
subprocess.Popen(['rm','-rf', os.path.abspath('./temp_questions/')])
subprocess.Popen(['mkdir', 'temp_questions'])
subprocess.Popen(['rm','-rf', os.path.abspath('./temp_answers/')])
subprocess.Popen(['mkdir', 'temp_answers'])
In [7]:
def Split(string,n):
"""Split a string into lines of length n with \n in between them"""
N =len(string)//n
return '\n'.join([string[i*n:(i+1)*n] for i in range(N)]+[string[N*n:]])
In [13]:
class Mut:
def __init__(self,mut):
self.mut = mut
self.messages = {'ok':(True,'no problems encountered' ),
'no ENST':(False,'no ENSTs correspond to this Uniprot code'),
'too short':(False, "none of the corresponding codes were long enough to encorporate this "+
"mutation"),
'wrong wild type': (False, "whilst at least one of the corresponding codes was long enough to"+
"encorporate this mutation the AA did not correspond to the wild type given")
}
parts = mut.split('_')
self.name = parts[0]
self.mutation = parts[1]
self.wild = self.mutation[0]
self.change = self.mutation[-1]
self.pos = int(self.mutation[1:-1])
self.valid,self.ENSTs = self.get_ENSTs()
self.valid,self.ENST,self.wild_code = self.get_code()
self.mutant_code = self.mutate_code()
def get_ENSTs(self):
i=self.name
if i[:4]=='ENST':
return [i]
elif i in set(ENST_Uniprot['UniProtKB/Swiss-Prot ID']):
Uni = 'UniProtKB/Swiss-Prot ID'
elif i in set(ENST_Uniprot['UniProtKB/TrEMBL ID']):
Uni = 'UniProtKB/TrEMBL ID'
else:
return (self.messages['no_ENST'],'')
return (self.messages['ok'],list(ENST_Uniprot[ENST_Uniprot[Uni]==i].index))
def get_code(self):
length_ok = False
pos_ok = False
codes = [ENST_codes.get(m,'') for m in self.ENSTs]
C = len(codes)
for i in range(C):
if len(codes[i])>=self.pos:
length_ok = True
if codes[i][self.pos-1]==self.wild:
pos_ok = True
return (self.messages['ok'],self.ENSTs[i],codes[i])
if not length_ok:
return (self.messages['too short'],'','')
else:
return (self.messages['wrong wild type'],'','')
def mutate_code(self):
return self.wild_code[:self.pos-1]+self.change+self.wild_code[self.pos:]
def for_printing(self):
return ('>{0}_{1}'.format(self.ENST,self.mut),Split(self.mutant_code,61))
In [14]:
self = Mut('P00519_M244V')
In [ ]:
def main():
ENST_codes = get_ENST_codes()
ENST_Uniprot = get_ENST_Uniprot()
clean_directories()
fine,too_short,wrong_wild = make_NetSurfP_query()
In [34]:
def make_NetSurfP_query():
muts = get_query()
mutations =[Mut(l) for l in muts]
validity = dict(zip([m.name for m in mutations],[m.valid for m in mutations]))
for_printing = [m.for_printing() for m in mutations]
temp_lists = dont_exceed_max(10000,for_printing)
make_questions('./temp_questions/','questions', temp_lists)
mutations_listed=[[i[0] for i in j] for j in temp_lists]
fine, too_short,wrong_wild = split_validity(validity)
query = {'fine':fine,
'too short': too_short,
'wrong wild': wrong_wild,
'mutations for netsurfp': mutations_listed}
with open('./temp_answers/query.json','w') as file:
json.dump(query,file)
In [23]:
def split_validity(validity):
too_short=[]
wrong_wild=[]
fine = []
for v in validity:
a,b = validity[v]
if a:
fine.append(v)
elif b=='none of the corresponding codes were long enough to encorporate this mutation':
too_short.append(v)
else:
wrong_wild.append(v)
return (fine, too_short,wrong_wild)
In [ ]:
def make_questions(pathname, filename, temp_lists):
for t in range(len(temp_lists)):
name = pathname+filename+str(t)+'.fsa'
with open(name,'w') as file:
file.write('')
with open(name,'a') as file:
for i in temp_lists[t]:
a,b = i
file.write(a+'\n')
file.write(b+'\n')
In [30]:
mutations_listed=[[i[0] for i in j] for j in temp_lists]
In [31]:
mutations_listed[0]
Out[31]:
In [25]:
muts = get_query()
mutations =[Mut(l) for l in muts]
validity = dict(zip([m.name for m in mutations],[m.valid for m in mutations]))
for_printing = [m.for_printing() for m in mutations]
temp_lists = dont_exceed_max(10000,for_printing)
In [ ]:
In [ ]:
def use_netsurfp():
In [ ]:
with open('temp_questions/questions0', 'r') as file:
f=file.readlines()
In [ ]:
with open('temp_questions/questions.fsa','w') as file:
file.write('')
with open('temp_questions/questions.fsa','a') as file:
for i in f[:2]:
file.write(i)
In [ ]:
p = subprocess.Popen(['netsurfp', '-i', 'temp_questions/questions0.fsa', '-o', 'temp_answers/answers0.rsa'])
p.communicate()
In [ ]:
f[:2]
In [ ]:
os.listdir()
In [ ]:
os.system("netsurfp -h")
In [ ]:
subprocess.Popen(['netsurfp','-i','./temp_questions/questions0','-o','./temp_answers/answers'])
In [ ]:
fine, too_short,wrong_wild = split_validity(validity)
In [ ]:
subprocess.Popen(['pwd'])
In [ ]:
validity = make_NetSurfP_query()
In [ ]:
In [ ]:
In [21]:
def get_query():
print('To use this program you need to supply a file with a list of mutation codes\n',
'These codes should be in the form of identifier_M244V where here\n',
'M is the wild type 244 is the position and V is the mutant amino acid\n',
'Your file should contain one mutation code per line and no other information\n')
query_file = input('please type the full path of the file that contains your mutation codes here without quotations marks')
try:
with open(query_file,'r') as file:
tmp = file.readlines()
print('Your query has been found')
return [t.strip('\n') for t in tmp]
except FileNotFoundError:
print('file not found, quit and try again')
return []
In [36]:
def dont_exceed_max(Max,code_list):
C = len(code_list)
temp_list=[]
for_inclusion=[]
limit = 0
for i in range(C):
a,b = code_list[i]
B = len(b)
if limit+B<Max:
for_inclusion.append(code_list[i])
limit+=B
else:
temp_list.append(for_inclusion)
limit=B
for_inclusion=[code_list[i]]
temp_list.append(for_inclusion)
return temp_list
In [20]:
muts = get_query()
mutations =[Mut(l) for l in muts]
Validity = dict(zip([m.name for m in mutations],[m.valid for m in mutations]))
for_printing = [m.for_printing() for m in mutations]
In [ ]:
temp_lists = dont_exceed_max(100000,for_printing)
In [ ]:
temp_lists[0]
In [ ]:
self.for_printing()
bit of codes to give me something to play with
In [ ]:
codes = pd.DataFrame.from_csv('OGvNeutral.csv')
codes['codes'] = codes['Uniprot ID']+'_'+codes['Substitution']
L = list(codes['codes'])
L1 = [i for i in L if type(i)==str]
with open('./data/codes.txt','w') as file:
file.write('')
with open('./data/codes.txt','a') as file:
for l in L1:
file.write(l+'\n')
In [ ]:
def
In [ ]:
for_printing[:10]
In [ ]:
self.mutant_code[243]
In [ ]:
self.ENST
In [ ]:
self.code
In [ ]: